package org.springblade.resource.hadoop;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Executor;
import java.net.URI;

/**
 * @author Ruison
 * on 2019/7/4 - 10:21
 */
@Configuration
@ConditionalOnProperty(name="hadoop.enabled",havingValue = "true")
@EnableAsync
@Slf4j
public class HadoopConfig {

    /**
     * 配置
     */
    public org.apache.hadoop.conf.Configuration getConfiguration(HadoopProperties hadoopProperties) {
        //读取配置文件
    	org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
    	try {
		
		        if (hadoopProperties.isAuthentication()){
		            System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,hadoopProperties.getKrb5ConfPath());
		            configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
		           
		            UserGroupInformation.setConfiguration(configuration);
		            UserGroupInformation.loginUserFromKeytab(hadoopProperties.getKeytabUsername(),
		            		hadoopProperties.getKeytabPath());
		        }
		        
		        String defaultFS = configuration.get(Constants.FS_DEFAULTFS);
		        //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
		        // the default is the local file system
		        if (defaultFS.startsWith("file")) {
		            String defaultFSProp = hadoopProperties.getNameNode();
		            if (StringUtils.isNotBlank(defaultFSProp)) {
//		                Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
		                configuration.set(Constants.FS_DEFAULTFS, defaultFSProp);
//		                fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
		            } else {
		                log.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS);
		                throw new RuntimeException(
		                        String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
		                );
		            }
		        } else {
		       	 log.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
		        }
		
		} catch (Exception e) {
			 log.error(e.getMessage(), e);
		}
        return configuration;
    }
    void t() {
    	



             
    }
    @Bean
    public FileSystem fs(HadoopProperties hadoopProperties){
        // 文件系统
    	 FileSystem fs = null ;
        try {
            
            if (fs == null) {
            	String hdfsUser=hadoopProperties.getUser();
            	URI uri = new URI(hadoopProperties.getDirectoryPath().trim());
            	org.apache.hadoop.conf.Configuration  conf = this.getConfiguration(hadoopProperties);
	            if (StringUtils.isNotEmpty(hdfsUser)) {
	                UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
	                fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
	                    @Override
	                    public FileSystem run() throws Exception {
	                    	return  FileSystem.get(uri,conf);
	                    }
	                });
	            } else {
	           	 log.warn("hdfs.root.user is not set value!");
	           	 fs = FileSystem.get(uri, conf);
	            }
	        }
        } catch (Exception e) {
            log.error("【FileSystem配置初始化失败】", e);
        }
        return fs;
    }

    @Bean
    @ConditionalOnBean(FileSystem.class)
    public HadoopClient hadoopClient(FileSystem fs, HadoopProperties hadoopProperties) {
        return new HadoopClient(fs, hadoopProperties);
    }
    
    @Bean (name = "taskExecutor")
    public Executor taskExecutor() {
        log.debug("Creating Async Task Executor");
        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("PutinThread-");
        executor.initialize();
        return executor;
    }
}
